【Python】sqlparseを使ってDDL以外のSQLファイルを抽出する

【Python】sqlparseを使ってDDL以外のSQLファイルを抽出する

Clock Icon2020.03.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の貞松です。

最近、データ分析環境絡みで特定のプロジェクトに含まれるSQLファイルをパースして解析したい要件が発生しました。
本記事では、上記の要件に対応する為に使用したsqlparseというライブラリについてご紹介すると共に、sqlparseを使って、任意のディレクトリ配下のSQLファイルからDDL(CREATE TABLE)でないファイルを抽出する方法を解説します。

sqlparseとは

sqlparseは非検証SQLパーサー(処理対象のSQL文の正当性は検証しない)のpythonライブラリです。

GitHub - andialbrecht/sqlparse: A non-validating SQL parser module for Python

サポートされているPythonバージョンは3.4以降で、インストールはpipで行います。

pip install sqlparse

sqlparseによるSQL文のパースは、sqlparse.parse()にSQL文の文字列を渡すことで実行します。

import sqlparse

query = "select id, name from users where id > 10;"
parsed_queries = sqlparse.parse(query)
print(type(parsed_queries))
print(type(parsed_queries[0]))

# 実行結果
# <class 'tuple'>
# <class 'sqlparse.sql.Statement'>

上記の通り、sqlparse.parse()の戻り値は、sqlparse.sql.Statementというクラスのインスタンスオブジェクトのタプルです。一つのStatementが一つのSQL文に対応しており、引数として渡した文字列内に複数のSQL文が含まれる場合はその分だけStatementが生成されます。
このStatementクラスは、以下のようにクラスを継承しており、TokenクラスはSQL文の1つのワード(最小の分割単位)で、TokenListクラスはその名の通りTokenのリストを持っています。

object
  └ Token
      └ TokenList
          └ Statement

Statementクラスのオブジェクトに対して、flattenメソッドを実行してlist化することで、パースしたSQL文のTokenオブジェクトリストを抽出することができます。

import sqlparse

query = "select id, name from users where id > 10;"
parsed_queries = sqlparse.parse(query)
tokens = list(parsed_queries[0].flatten())
print(tokens)

# 実行結果
# [<DML 'select' at 0x10CA74670>, <Whitespace ' ' at 0x10CA746E0>, <Name 'id' at 0x10CA74750>, <Punctuation ',' at 0x10CA747C0>, <Whitespace ' ' at 0x10CA74830>, <Name 'name' at 0x10CA748A0>, <Whitespace ' ' at 0x10CA74910>, <Keyword 'from' at 0x10CA74600>, <Whitespace ' ' at 0x10CA749F0>, <Name 'users' at 0x10CA74A60>, <Whitespace ' ' at 0x10CA74AD0>, <Keyword 'where' at 0x10CA74B40>, <Whitespace ' ' at 0x10CA74BB0>, <Name 'id' at 0x10CA74C20>, <Whitespace ' ' at 0x10CA74C90>, <Comparison '>' at 0x10CA74D00>, <Whitespace ' ' at 0x10CA74D70>, <Integer '10' at 0x10CA74DE0>, <Punctuation ';' at 0x10CA74E50>]

Tokenオブジェクトは、上記の通り<トークン '値' at アドレス>の形式となっています。
Token.valueでトークンの値、Token.ttypeでトークンタイプを取得することができます。

import sqlparse
 
query = "select id, name from users where id > 10;"
parsed_queries = sqlparse.parse(query)
tokens = list(parsed_queries[0].flatten())

for t in tokens:
    print("token_value: '{}', token_type: {}".format(t.value, t.ttype))

# 実行結果
# token_value: 'select', token_type: Token.Keyword.DML
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: 'id', token_type: Token.Name
# token_value: ',', token_type: Token.Punctuation
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: 'name', token_type: Token.Name
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: 'from', token_type: Token.Keyword
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: 'users', token_type: Token.Name
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: 'where', token_type: Token.Keyword
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: 'id', token_type: Token.Name
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: '>', token_type: Token.Operator.Comparison
# token_value: ' ', token_type: Token.Text.Whitespace
# token_value: '10', token_type: Token.Literal.Number.Integer
# token_value: ';', token_type: Token.Punctuation

ここから更に条件判定して情報を抽出する(今回のケースではDDLかそうでないかを判定して、DDLでないファイルを抽出する)には、これらのトークンが具体的に何であるか(キーワードなのかコメントなのか、はたまた空白なのか、など)を判定するメソッドや処理の記述を用いて実装していくことになります。

任意のディレクトリ配下のSQLファイルからDDLでないファイルを抽出する

実行環境

  • macOS Catalina 10.15.4
  • Python 3.7.6
  • sqlparse 0.3.1

sqlparseを使用してSQLファイルがDDLか否かを判定する

Tokenクラスにはmatchというメソッドがあり、トークンタイプと値を引数として渡すことで、そのトークンのトークンタイプと値が渡した引数と一致するか否かをboolで返してくれます。
SQLをパースして得られたトークンを順番にmatchで判定し、CREATETABLEのセットが出現するかどうかでこのSQLがDDLか否かを判定します。

import sqlparse

def is_ddl(sql):
    query = sqlparse.parse(sql)
    tokens = list(query[0].flatten())
    is_find_create = False
    is_find_table = False
    for t in tokens:
        # 
        if not is_find_create:
            is_find_create = t.match(sqlparse.tokens.DDL, "CREATE")
        if not is_find_table:
            is_find_table = t.match(sqlparse.tokens.Keyword, "TABLE")

        # "CREATE" と "TABLE" が両方出現した場合
        if is_find_create and is_find_table:
            return True
        # "CREATE" がない状態で、先に"TABLE"が出現した場合
        elif is_find_table:
            return False
    
    return False

# is_ddlの動作を確認
sql_list = [
'''select
  id,
  name
from
  schema.users
where
  id > 10
;''',
'''create table users(
  id integer not null,
  name varchar(50) not null
)
distkey(id)
compound sortkey(id)
;''',
"drop table users;"
]

for sql in sql_list:
    if is_ddl(sql):
        print("This sql is DDL.")
    else:
        print("This sql is not DDL.")

# 実行結果
# This sql is not DDL.
# This sql is DDL.
# This sql is not DDL.

任意のディレクトリ配下のSQLファイルをクロールして判定する

DDL判定処理が出来上がったので、あとは任意のディレクトリ配下のSQLファイルをクロールして、それらを判定にかけるだけです。
globでディレクトリ配下のファイルパスを取得して、拡張子.sqlのファイルに絞り込んだ後、DDL判定にかけてDDL以外のファイル名を出力します。

import glob
import os
import sqlparse
import sys


def is_ddl(sql):
### 中略 ###


# コマンドライン引数に検索したいディレクトリのパス(末尾スラッシュあり)を渡す
args = sys.argv
# ディレクトリ配下のサブディレクトリまで再帰的に取得
path = "{}**".format(args[1])
files = glob.glob(path, recursive=True)
# 拡張子.sqlのファイルのみに絞り込む
sql_files = [f for f in files if os.path.splitext(f)[1] == ".sql"]
# ファイルの中身をDDL判定に通して、DDLでないファイルをリストに詰めて出力する
not_ddl_files = []
for sql_file in sql_files:
    with open(sql_file) as f:
        query = f.read()
    if not is_ddl(query):
        not_ddl_files.append(os.path.basename(sql_file))
print(not_ddl_files)

ファイル抽出するソースコード全体

出来上がったソースコード全体はGistにアップしていますので必要に応じてご参照ください。

import glob
import os
import sqlparse
import sys
def is_ddl(sql):
query = sqlparse.parse(sql)
tokens = list(query[0].flatten())
is_find_create = False
is_find_table = False
for t in tokens:
#
if not is_find_create:
is_find_create = t.match(sqlparse.tokens.DDL, "CREATE")
if not is_find_table:
is_find_table = t.match(sqlparse.tokens.Keyword, "TABLE")
# "CREATE" と "TABLE" が両方出現した場合
if is_find_create and is_find_table:
return True
# "CREATE" がない状態で、先に"TABLE"が出現した場合
elif is_find_table:
return False
return False
# コマンドライン引数に検索したいディレクトリのパス(末尾スラッシュあり)を渡す
args = sys.argv
# ディレクトリ配下のサブディレクトリまで再帰的に取得
path = "{}**".format(args[1])
files = glob.glob(path, recursive=True)
# 拡張子.sqlのファイルのみに絞り込む
sql_files = [f for f in files if os.path.splitext(f)[1] == ".sql"]
# ファイルの中身をDDL判定に通して、DDLでないファイルをリストに詰めて出力する
not_ddl_files = []
for sql_file in sql_files:
with open(sql_file) as f:
query = f.read()
if not is_ddl(query):
not_ddl_files.append(os.path.basename(sql_file))
print(not_ddl_files)

まとめ

SQLパーサーのpythonライブラリであるsqlparseについてご紹介しつつ、sqlparseを使って任意のディレクトリ配下にあるDDLでないSQLファイル一覧を取得する方法を解説しました。
これを応用すれば、SQLファイルからプロジェクトで使用しているテーブルやカラムの一覧を取得する等も可能になり非常に便利なので、さらに活用していきたいと思います。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.